package main

import (
	"bufio"
	"encoding/json"
	"flag"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
	"os/exec"
	"strings"
	"time"
)

var k8sHost string
var k8sPrefix string
var haconfig string
var hareload string
var domain string

type PortToPubport struct {
	Pubport  int
	Protocol string
}

type PordsCreater struct {
	//namespace+service作为key
	SvcPordsMap map[string]PortToPubport
}

func (pc *PordsCreater) Init() {
	pc.SvcPordsMap = make(map[string]PortToPubport)
	return
}
func (pc *PordsCreater) Add(svc string, protocal string, port int) error {
	_, ok := pc.SvcPordsMap[svc]
	if ok {
		return fmt.Errorf("服务已被占用")
	}

	var p PortToPubport
	p.Protocol = protocal
	p.Pubport = port
	pc.SvcPordsMap[svc] = p
	return nil
}

func (pc *PordsCreater) Delete(svc string) {
	_, ok := pc.SvcPordsMap[svc]
	if ok {
		log.Println(svc + " 不存在")
		return
	}
	delete(pc.SvcPordsMap, svc)
}

func (pc *PordsCreater) WriteCfg(nodes []string, cfg string, domainname string) {
	file, err := os.Create(cfg)
	if nil != err {
		log.Fatal("create cfg error", err)
	}
	defer file.Close()

	for key, v := range pc.SvcPordsMap {
		var buf1 string
		for _, node := range nodes {
			fmt.Println(node)
			fmt.Println(v)
			buf2 := fmt.Sprintf("\n	    proxy_pass http://%s:%d;", node, v.Pubport)
			buf1 += buf2
		}

		buf := fmt.Sprintf(`server {
    listen       80;
    server_name  %s.%s;

    error_page   500 502 503 504  /50x.html;
    location = /50x.html {
        root   /usr/share/nginx/html;
    }

    location / {
	    client_max_body_size 10G;
	    proxy_set_header X-Real-IP $remote_addr;
	    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
	    proxy_set_header Host $http_host;
	    proxy_set_header X-NginX-Proxy true;
	    %s
	    proxy_redirect off;
    }
}`, key, domainname, buf1)

		buf += "\n\n"
		file.Write([]byte(buf))
	}

	return
}

type ServiceMetadata struct {
	Name      string
	Namespace string
}

type ServicePort struct {
	NodePort int
	Protocol string
}

// 如果Type 为 NodePort,这就是需要引出去的服务
type ServiceSpec struct {
	Type  string
	Ports []ServicePort
}

type ServiceObject struct {
	Metadata ServiceMetadata
	Spec     ServiceSpec
}

type Service struct {
	Type   string
	Object ServiceObject
}

func init() {
	flag.StringVar(&k8sHost, "host", "http://127.0.0.1:8080", "k8s 主机地址")
	flag.StringVar(&k8sPrefix, "pre", "api/v1", "前缀")
	flag.StringVar(&haconfig, "ngconfig", "/etc/nginx/conf.d/kubernetes.conf", "配置文件的默认路径")
	flag.StringVar(&hareload, "ngreload", "systemctl reload nginx", "配置 reload 命令")
	flag.StringVar(&domain, "domain", "bonc.com.cn", "域名")
	flag.Parse()
}

func watchSvc(ch chan<- Service) {
	resp, err := http.Get(k8sHost + "/" + k8sPrefix + "/watch/services")
	if err != nil {
		log.Println("get http", err)
		return
	}
	defer resp.Body.Close()
	bufrd := bufio.NewReader(resp.Body)
	var svc Service
	// log.Println("watchSvc")

	for {
		line, _, err := bufrd.ReadLine()
		if err != nil {
			log.Println("readline error", err)
			return
		}
		json.Unmarshal(line, &svc)
		fmt.Println(svc)
		if svc.Object.Spec.Type == "NodePort" {
			ch <- svc
		}
	}
}

// 如果获取服务失败，回5分钟重新链接一次
func watchService(ch chan<- Service) {
	for {
		watchSvc(ch)
		time.Sleep(5 * time.Minute)
	}
}

type NodeMetadata struct {
	Name string
}
type NodeObj struct {
	Metadata NodeMetadata
}

type NodeInfo struct {
	Kind  string
	Items []NodeObj
}

func getAllNodes() []string {
	resp, err := http.Get(k8sHost + "/" + k8sPrefix + "/nodes")
	if err != nil {
		log.Println("获取nodes失败", err)
		return nil
	}
	defer resp.Body.Close()
	buf, err := ioutil.ReadAll(resp.Body)
	//fmt.Println(string(buf))
	if err != nil {
		log.Println("读取nodes错误")
		return nil
	}
	var nodeInfo NodeInfo
	json.Unmarshal(buf, &nodeInfo)
	ret := make([]string, 1024)
	var index int
	fmt.Println(nodeInfo)
	for _, node := range nodeInfo.Items {
		ret[index] = node.Metadata.Name
		index++
	}
	return ret[:index]
}

func watchNodes(ch chan<- []string) {
	var preNodes []string
	for {
		nodes := getAllNodes()
		fmt.Println(len(nodes))
		fmt.Println(nodes)
		if preNodes == nil || len(preNodes) != len(nodes) {
			ch <- nodes
			preNodes = nodes
		} else {
		tag:
			for _, node := range nodes {
				for _, preNodes := range preNodes {
					if node == preNodes {
						continue tag
					}
				}

				//如果有不同的node，退出比较
				ch <- nodes
				preNodes = nodes
				break
			}
		}
		time.Sleep(time.Hour)
	}
}

/*
*一个线程负责读取service的变化，放入chan中，另外一个线程负责读取并定时更新服务和保存映射关系
*启动一个线程监控node的变化，每10秒钟获取一次，如果有变化，更新配置文件
 */
func main() {
	var pc PordsCreater
	pc.Init()

	var chNode = make(chan []string)
	go watchNodes(chNode)
	var nodes []string

	var ch = make(chan Service)
	go watchService(ch)

	t := time.NewTimer(time.Second)

	var command string
	var args []string
	tmp := strings.Split(hareload, " ")
	command = tmp[0]
	args = tmp[1:]

	for {
		select {
		case s := <-ch:
			tmp := fmt.Sprintf("%s-%s", s.Object.Metadata.Namespace, s.Object.Metadata.Name)
			if s.Type == "ADDED" {
				for _, port := range s.Object.Spec.Ports {
					// svc := fmt.Sprintf("%s%d", tmp, port.NodePort)
					//现在的策略是一个服务只暴露一个端口，如果后面还有暴露多个端口的服务，需要修改策略和代码
					svc := fmt.Sprintf("%s", tmp)
					pc.Add(svc, port.Protocol, port.NodePort)
				}
			} else {
				pc.Delete(tmp)
				// for _, port := range s.Object.Spec.Ports {
				// 	svc := fmt.Sprintf("%s%d", tmp, port.NodePort)
				// 	pc.Delete(svc)
				// }
			}
			t.Reset(5 * time.Second)
			// fmt.Println("...")

		case n := <-chNode:
			nodes = n
			t.Reset(10 * time.Second)
		case <-t.C:
			log.Println("flush")
			if nodes == nil {
				t.Reset(5 * time.Second)
			} else {
				pc.WriteCfg(nodes, haconfig, domain)
				log.Println(haconfig, hareload, command)
				log.Println(args)
				cmd := exec.Command(command, args...)
				err := cmd.Run()
				if err != nil {
					log.Println("hareload 命令执行失败", err)
				}
			}
		}
	}
}
